{
  "metadata" : {
    "id" : "5297db27-a8c6-40dc-b442-d4b6f279e3bb",
    "name" : "streaming_weblogs",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A2D4928A0946433492E0431FB9AD1D66"
    },
    "cell_type" : "markdown",
    "source" : "# Simple Weblog Analytics - The Streaming Way\nIn this notebook, we are going to explore the weblog use case using the stream 'as it happens'.\n\nThis notebook requires a local `TCP ` server that simulates the Web server sending data.  \n\nPlease start the [weblogs_TCP_Server](./weblogs_TCP_server.snb.ipynb) notebook before running this one."
  }, {
    "metadata" : {
      "id" : "66DD67FAD0C24E00965CD26C0858D46B"
    },
    "cell_type" : "markdown",
    "source" : "## To connect to a TCP source, we need the host and the port of the TCP server.\nHere we use the defaults used in the `weblogs_TCP_server` notebook. If you changed these parameters there, change them here accordingly"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "35F22D17E25940618FFE838C99405B16"
    },
    "cell_type" : "code",
    "source" : [ "val host = \"localhost\"\n", "val port = 9999" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "C0E80D55C81340318BD2CD304E322F5B"
    },
    "cell_type" : "markdown",
    "source" : "## We use the `TextSocketSource` in Structured Streaming to connect to the TCP server and consume the text stream.\nThis `Source` is called `socket` as the short name we can use as `format` to instantiate it.\n\nThe options needed to configure the `socket` `Source` are `host` and `port` to provide the configuration of our TCP server."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "1356EE7F9684454E89FA1F1F8D5CAA95"
    },
    "cell_type" : "code",
    "source" : [ "val stream = sparkSession.readStream\n", "  .format(\"socket\")\n", "  .option(\"host\", host)\n", "  .option(\"port\", port)\n", "  .load()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "87702285726047CD8F3BE8FE0471C863"
    },
    "cell_type" : "markdown",
    "source" : "## We define a schema for the data in the logs\nFollowing the formal description of the dataset (at: [NASA-HTTP](http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html) ), the log is structured as follows:\n\n>The logs are an ASCII file with one line per request, with the following columns:\n- host making the request. A hostname when possible, otherwise the Internet address if the name could not be looked up.\n- timestamp in the format \"DAY MON DD HH:MM:SS YYYY\", where DAY is the day of the week, MON is the name of the month, DD is the day of the month, HH:MM:SS is the time of day using a 24-hour clock, and YYYY is the year. The timezone is -0400.\n- request given in quotes.\n- HTTP reply code.\n- bytes in the reply.\n\nThe dataset provided for this exercise offers this data in JSON format"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "727B6082C6C24A39B0D651583E62BFBC"
    },
    "cell_type" : "code",
    "source" : [ "import java.sql.Timestamp\n", "case class WebLog(host:String, \n", "                  timestamp: Timestamp, \n", "                  request: String, \n", "                  http_reply:Int, \n", "                  bytes: Long\n", "                 )" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "52B745BDE1A44B928CF66DB4118589A4"
    },
    "cell_type" : "markdown",
    "source" : "##We convert the raw data to structured logs\nIn the batch analytics case we could load the data directly as JSON records. In the case of the `Socket` source, that data is received as plain text.\nTo transform our raw data to `WebLog` records, we first require a schema. The schema provides the necessary information to parse the text to a JSON object. It's the 'structure' when we talk about 'structured'  streaming.\n\nAfter defining a schema for our data, we will:\n\n- Transform the text `value` to JSON using the JSON support built in the structured API of Spark\n- Use the `Dataset` API to transform the JSON records to `WebLog` objects\n\nAs result of this process, we will obtain a `Streaming Dataset` of `WebLog` records."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "DADD984537CC4AE3816E2BA42480F969"
    },
    "cell_type" : "code",
    "source" : [ "val webLogSchema = Encoders.product[WebLog].schema " ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "C39C1D8FE59242F58953E8F72E95E2AD"
    },
    "cell_type" : "code",
    "source" : [ "val jsonStream = stream.select(from_json($\"value\", webLogSchema) as \"record\")" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "62E7E2F5CDC04C1185FD779141748500"
    },
    "cell_type" : "code",
    "source" : [ "val webLogStream: Dataset[WebLog] = jsonStream.select(\"record.*\").as[WebLog]" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "7BF49320BEDD4896882407FDF6CD3E37"
    },
    "cell_type" : "markdown",
    "source" : "## We have a structured stream.\nThe `webLogStream` we just obtained is of type `Dataset[WebLog]` like we had in the batch analytics job.\nThe difference between this instance and the batch version is that `webLogStream` is a streaming `Dataset`.\n\nWe can observe this by querying the object.\n"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "E18B9BF47F9644EE8EC52885827EF4EE"
    },
    "cell_type" : "code",
    "source" : [ "webLogStream.isStreaming" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "41E1A019CAEE4976B484B8A5D59D100E"
    },
    "cell_type" : "markdown",
    "source" : "## Operations on Streaming Datasets\nAt this point in the batch job, we were creating the first query on our data: How many records are contained in our dataset?\nThis is a question that we can answer easily when we have access to all the data. But how to count records that are constantly arriving? \nThe answer is that some operations we consider usual on a static `Dataset`, like counting all records, do not have a defined meaning on a streaming Dataset.\n\nAs we can observe, attempting to execute the `count` query below will result in an `AnalysisException`. Queries in Structured Streaming are a continuous operation that needs to be scheduled. To start scheduling queries on a stream, we use the `writeStream.start()` operation. "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "47C12878560C49089D4E4742729D9694"
    },
    "cell_type" : "code",
    "source" : [ "// expect this call to fail!\n", "val count = webLogStream.count()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "4288478AA31B49928541E3795ED7217F"
    },
    "cell_type" : "markdown",
    "source" : "## What are popular URLs? In what timeframe?\n\nNow that we have immediate analytic access to the stream of weblogs we don't need to wait for a day or a month to have a rank of the popular URLs. We can have that information as trends unfold on much shorter windows of time.\n\nTo define the period of our interest, we create a window over some timestamp. An interesting feature of Structured Streaming is that we can define that window on the timestamp when the data was produced, also known as 'event time' as opposed to the time when the data is processed.\n\nOur window definition is of 5 minutes of event data. Given that the TCP Server is replaying the logs in a simulated timeline, the 5 minutes might happen much faster or slower than the clock time. In this way, we can appreciate how Structured Streaming uses the timestamp information in the events to keep track of the event timeline.\n\nAs we learned from the batch analytics, we should extract the URLs and only select content pages, like `html`, `htm`, or directories. Let's apply that acquired knowledge first before proceeding to define our `window` query."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A033C0056A0F478B806143935073E3E4"
    },
    "cell_type" : "code",
    "source" : [ "// A regex expression to extract the accessed URL from weblog.request \n", "val urlExtractor = \"\"\"^GET (.+) HTTP/\\d.\\d\"\"\".r\n", "val allowedExtensions = Set(\".html\",\".htm\", \"\")\n", "\n", "val contentPageLogs: String => Boolean = url => {\n", "  val ext = url.takeRight(5).dropWhile(c => c != '.')\n", "  allowedExtensions.contains(ext)\n", "}\n", "\n", "val urlWebLogStream = webLogStream.flatMap{ weblog => \n", "  weblog.request match {                                        \n", "    case urlExtractor(url) if (contentPageLogs(url)) => Some(weblog.copy(request = url))\n", "    case _ => None\n", "  }\n", "}" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "008F6271D01845028313B5DA01D7E2BB"
    },
    "cell_type" : "markdown",
    "source" : "## Top Content Pages Query\nWe have converted the request to only contain the visited URL and filtered out all non-content pages. \nWe will now define the windowed query to compute the top trending URLs "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "00D498E82EBB42DD8EB660C40F15D8F0"
    },
    "cell_type" : "code",
    "source" : [ "val rankingURLStream = urlWebLogStream.groupBy($\"request\", window($\"timestamp\", \"5 minutes\", \"1 minute\")).count()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "D43F6DAFECBB416F9B5C73A9D16B6438"
    },
    "cell_type" : "markdown",
    "source" : "## Start the stream processing\nAll the steps we have followed so far have been to define the process that the stream will undergo but no data has been processed yet. \n\nTo start a Structured Streaming job, we need to specify a `sink` and an `output mode`. \nThese are two new concepts introduced by Structured Streaming.\n\nA `sink` defines where we want to materialize the resulting data, like to a file in a file system, to an in-memory table or to another streaming system such as Kafka.\nThe `output mode` defines how we want the results to be delivered: Do we want to see all data every time, only updates or just the new records? \n\nThese options are given to a `writeStream` operation that creates the streaming query that starts the stream consumption, materializes the computations \ndeclared on the query and produces the result to the output `sink`.\n\nWe will visit all these concepts in detail later on. For now, we will use them empirically and observe the results.\n\nFor our query, we will use the `memory` `sink` and output mode `complete` to have a fully updated table each time new records are added to the result of keeping track of the URL ranking."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "B01C2A1937204F6E8F001BF0ABFD0BDB"
    },
    "cell_type" : "code",
    "source" : [ "val query = rankingURLStream.writeStream\n", "  .queryName(\"urlranks\")\n", "  .outputMode(\"complete\")\n", "  .format(\"memory\")\n", "  .start()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8FB679DD8DD045E69BEF5DC74FB427E2"
    },
    "cell_type" : "markdown",
    "source" : "### The memory sink outputs the data to a temporary table of the same name given in the queryName option."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F9F3A081AA0B4ED188828F2025139163"
    },
    "cell_type" : "code",
    "source" : [ "sparkSession.sql(\"show tables\").show()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "95836A339A17498C8A35459222AE3BDF"
    },
    "cell_type" : "markdown",
    "source" : "## Exploring the Data\nThe `memory` `sink` outputs the data to a temporary table of the same name given in the `queryName` option. We can create a `DataFrame` from that table to explore the results of the stream process. \n"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7BDA487B44AD4AFF830A9F7DF469B764"
    },
    "cell_type" : "code",
    "source" : [ "val urlRanks = sparkSession.sql(\"select * from urlranks\")" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "20C25E20A8354E0E8F21FDDBE666A6F0"
    },
    "cell_type" : "markdown",
    "source" : "### Before we can see any materialized results, we need to wait for the window to complete.\nGiven that we are accelerating the log timeline on the producer side, after few seconds, we can execute the next command to see the result of the first windows."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A7AB127108814D1A9E76FC49E8893140"
    },
    "cell_type" : "code",
    "source" : [ "urlRanks.select($\"request\", $\"window\", $\"count\").orderBy(desc(\"count\"))" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "3BAF4A0860EE44548BD2D1D18882F142"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}